package com.gvsoft.communication.net;

import com.gvsoft.communication.net.status.DefaultSocketStatusHandle;
import com.gvsoft.communication.net.status.ISocketStatusHandle;
import com.gvsoft.communication.net.gather.ConcurrentGather;
import com.gvsoft.communication.net.model.IOrder;
import com.gvsoft.communication.net.model.IPacket;
import com.gvsoft.communication.common.model.packet.Packet;
import com.gvsoft.communication.net.adapter.IOAdapter;
import com.gvsoft.communication.net.bufferpool.ReadBufferPool;
import com.sun.corba.se.pept.transport.ByteBufferPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created with IntelliJ IDEA.
 * ProjectName:gvMsgRouting
 * Author: zhaoqiubo
 * Date: 15/8/5
 * Time: 上午8:34
 * Desc: 连接类，保存通道及通道最后访问时间。
 */
public class NSocket implements Closeable {

    private static Logger logger = LogManager.getLogger("net");
    /**
     * 指令写入缓冲区大小
     */
    private Integer writeBlock = 8192;
    private String token;
    private String userId;
    private SocketChannel channel;
    private int lastAccessTime;
    private SelectionKey key;

    public final LinkedBlockingQueue<ByteBuffer> writeQ = new LinkedBlockingQueue<>();//写入消息队列
    private Dispatcher dispatcher;

    private IOAdapter ioAdapter;//byte字节解析适配器

    private IoBuffer readBuffer;

    /**
     * 状态实现接口
     */
    private ISocketStatusHandle statusHandle = new DefaultSocketStatusHandle();

    public ClientStatus getStatus() {
        return status;
    }

    /**
     * 客户端状态枚举
     */
    private ClientStatus status;

    /**
     * client状态枚举定义
     */
    public enum ClientStatus {
        ACTIVE, CLOSED, RECONNECT, NEW, CONNECTED
    }

    public boolean isActive() {
        return this.status == ClientStatus.ACTIVE;
    }

    public boolean isClosed() {
        return this.status == ClientStatus.CLOSED;
    }

    public boolean isReconnect() {
        return this.status == ClientStatus.RECONNECT;
    }

    public void setActive() {
        this.status = ClientStatus.ACTIVE;
        logger.info("已成功连接服务端！"+this.getUserId() + "与服务端连接状态设置为：[" + this.getStatus() + "],通道为：" + this.getChannel());
        statusHandle.afterActived(this);
    }

    public boolean isNew() {
        return this.status == ClientStatus.NEW;

    }

    public Dispatcher getDispatcher() {
        return dispatcher;
    }

    public boolean isConnected() {
        return this.status == ClientStatus.CONNECTED;
    }

    public void setClosed() {
        this.status = ClientStatus.CLOSED;
        statusHandle.afterClosed(this);
    }

    public void setReconnect() {
        this.status = ClientStatus.RECONNECT;
        statusHandle.afterReconnect(this);
    }

    public void setConnected() {
        this.status = ClientStatus.CONNECTED;
        statusHandle.afterConnected(this);
    }

    public void setNew() {
        this.status = ClientStatus.NEW;
        statusHandle.afterNew(this);
    }

    public String getLrid() {
        return lrid;
    }

    public void setLrid(String lrid) {
        this.lrid = lrid;
    }


    /**
     * 标记客户端登录的请求流水号，如果此请求流水号有相应，证明服务端已经处理完登录请求；
     */
    private String lrid = "";

    //记录写入的时候的断包位置
    private int writePos = 0;

    //不建议使用
    public NSocket() {

    }

    public NSocket(String userId) {
        this.userId = userId;
    }

    public NSocket(SocketChannel channel, int lastAccessTime, Dispatcher dispatcher, IOAdapter ioAdapter, ISocketStatusHandle socketStatusHandle) {

        this.channel = channel;
        this.lastAccessTime = lastAccessTime;
        this.dispatcher = dispatcher;
        this.ioAdapter = ioAdapter;
        this.statusHandle = socketStatusHandle;

    }

    public NSocket(String userid, SocketChannel channel, Dispatcher dispatcher, IOAdapter ioAdapter) {

        this.userId = userid;
        this.channel = channel;
        this.dispatcher = dispatcher;
        this.ioAdapter = ioAdapter;

    }

    public void releaseUserId(String userId) {
        this.userId = userId;
    }

    public void setLastAccessTime(int lastAccessTime) {
        this.lastAccessTime = lastAccessTime;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public String getToken() {
        return token;
    }

    public String getUserId() {
        return userId;
    }

    public int getLastAccessTime() {
        return lastAccessTime;
    }

    public void setToken(String token) {
        this.token = token;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public SelectionKey getKey() {
        return key;
    }

    public void setKey(SelectionKey key) {
        this.key = key;
    }

    public void close() {

        if (key != null) {
            key.cancel();
            key = null;
        }
        if (channel != null && channel.isOpen()) {
            try {
                channel.close();
            } catch (IOException e) {
                logger.error("channel close error:" + e.getMessage());
            }
        }
        setClosed();
    }

    public void doWrite() throws Exception {

        SocketChannel channel = (SocketChannel) key.channel();
        synchronized (writeQ) {
            while (true) {
                ByteBuffer byteBuffer = writeQ.peek();
                if (byteBuffer == null) {
                    key.interestOps(SelectionKey.OP_READ);
                    key.selector().wakeup();
                    break;
                }
                try {
                    //如果上次写入的是断包，则将byteBuffer移动到上次写入的位置，
                    if (writePos > 0) {
                        byteBuffer.position(writePos);
                    }
                    channel.write(byteBuffer);
                    //如果buffer没有全部被写入，那么记录写入位置后，中断循环，等待下次OP_WRITE
                    if (byteBuffer.remaining() > 0) {
                        writePos = byteBuffer.position();
                        break;
                    } else {
                        writePos = 0;
                        writeQ.remove();
                    }
                } catch (Exception e) {
                    writeQ.remove();
                    throw e;
                }

            }

        }
    }

    public void write(IOrder iOrder) {

        try {
            //创建一个byteBuffer用来存储要写入的buffer
            ByteBuffer byteBuffer = ByteBuffer.allocate(writeBlock);
            ioAdapter.analyWrite(iOrder, byteBuffer);
//        logger.info("写入::"+iOrder.generateOrderStr());
            if (byteBuffer.remaining() > 0) {
                this.writeQ.offer(byteBuffer);
            } else {
                logger.error("bytebuffer null error!");
            }
            ConcurrentGather.increasingWrite();
            this.key.interestOps(SelectionKey.OP_WRITE);
            this.key.selector().wakeup();
        } catch (Exception e) {
            logger.error("write error :" + e.getMessage());
            close();
        }

    }

    public void doRead() throws Exception {
        if (readBuffer == null) {
            readBuffer = IoBuffer.allocate(8192);
        }
        ByteBuffer data = ReadBufferPool.getBuffer();
        try {
            int n;
            while ((n = channel.read(data)) > 0) {
                data.flip();
                readBuffer.writeBytes(data.array(), data.position(), data.remaining());
                data.clear();
            }

            if (n < 0) {
                logger.info("User <" + ((NSocket) (key.attachment())).getUserId() + "> read error,is set Closed!");
                close();
                return;
            }

            IoBuffer tempBuf = readBuffer.duplicate().flip();
            Object msg;
            while (true) {
                tempBuf.mark();
                if (tempBuf.remaining() > 0) {
                    msg = ioAdapter.analyRead(tempBuf);
                } else {
                    msg = null;
                }
                if (msg == null) {
                    tempBuf.reset();
                    readBuffer = resetIoBuffer(tempBuf);
                    break;
                }
                asyncHandlePacket(ioAdapter.decode((String) msg));
            }
        } finally {
            ReadBufferPool.freeBuffer(data);
        }

    }

    protected IoBuffer resetIoBuffer(IoBuffer buffer) {
        IoBuffer newBuffer = null;

        if (buffer != null && buffer.remaining() > 0) {
            int len = buffer.remaining();
            byte[] bb = new byte[len];
            buffer.readBytes(bb);
            newBuffer = IoBuffer.wrap(bb);
            newBuffer.position(len);
        }

        return newBuffer;
    }

    /**
     * 分析并把结果放入线程池处理
     *
     * @param strPacket 需要处理的报文字符串
     */

    public void asyncHandlePacket(String strPacket) {
//        logger.info("读出:"+strPacket);
        IPacket packet = Packet.analysePacket(strPacket);
        ConcurrentGather.increasingRead();
        dispatcher.execReadHandle(new PacketHandleThread(packet, key, dispatcher));
    }

    public IOAdapter getIoAdapter() {
        return ioAdapter;
    }


    public void regStatusHandle(ISocketStatusHandle statusHandle) {
        this.statusHandle = statusHandle;
    }

    public void reCreate(SocketChannel channel, Dispatcher dispatcher, IOAdapter ioAdapter, SelectionKey key, ISocketStatusHandle socketStatusHandle) {
        this.channel = channel;
        this.dispatcher = dispatcher;
        this.ioAdapter = ioAdapter;
        this.key = key;
        this.statusHandle = socketStatusHandle;
    }

}
